/*
 * Copyright (C) 2019 Google Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.dataflow.cdc.applier;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * The {@link CdcToBigQueryChangeApplierPipeline} consumes change data corresponding to changes
 * in a database. This data is consumed from a group of Pubsub topics (one topic for each table in
 * the external database); then it is processed and inserted to BigQuery.
 *
 * For each table in the external database, the {@link CdcToBigQueryChangeApplierPipeline} will
 * produce two BigQuery tables:
 *
 * 1) One changelog table, with the full sequence of changes made to the table in the external
 *    database. This table is also referred to as Staging Table, Changelog table
 * 2) One <b>replica</b> table, which is a replica of the table in the external database. This
 *    replica table is built periodically by issuing MERGE statements to BigQuery that synchronize
 *    the tables using the changelog table. This table is referred to as the Replica Table.
 *
 * The change data is intended to be generated by a Debezium-based connector, which watches the
 * changelog from the external database, formats the data into Beam {@link Row} format, updates
 * Data Catalog with schema information for each table, and pushes the change data to PubSub.
 */
public class CdcToBigQueryChangeApplierPipeline {

  public static final Integer SECONDS_PER_DAY = 24 * 60 * 60;
  public static final Integer MAX_BQ_MERGES_PER_TABLE_PER_DAY = 1000;

  public static final Long MINIMUM_UPDATE_FREQUENCY_SECONDS =
      Math.round((SECONDS_PER_DAY / MAX_BQ_MERGES_PER_TABLE_PER_DAY) * 1.10);

  private static final Logger LOG = LoggerFactory.getLogger(
      CdcToBigQueryChangeApplierPipeline.class);

  /**
   * The {@link CdcApplierOptions} class provides the custom execution options passed by the
   * executor at the command-line.
   */
  public interface CdcApplierOptions extends PipelineOptions {

    @Description("Comma-separated list of PubSub topics to where CDC data is being pushed.")
    String getInputTopics();
    void setInputTopics(String topic);

    @Description("Comma-separated list of PubSub subscriptions where CDC data is available.")
    String getInputSubscriptions();
    void setInputSubscriptions(String subscriptions);

    @Description("The BigQuery dataset where Staging / Change Log tables are to be kept.")
    String getChangeLogDataset();
    void setChangeLogDataset(String dataset);

    @Description("The BigQuery dataset where the Replica tables are to be kept.")
    String getReplicaDataset();
    void setReplicaDataset(String dataset);

    @Description("How often the pipeline will issue updates to the BigQuery replica table.")
    Integer getUpdateFrequencySecs();
    void setUpdateFrequencySecs(Integer frequency);
  }

  private static PDone buildIngestionPipeline(
      String transformPrefix,
      CdcApplierOptions options,
      PCollection<Row> input) {
    return input
        .apply(String.format("%s/ApplyChangesToBigQuery", transformPrefix),
            BigQueryChangeApplier.of(
                options.getChangeLogDataset(),
                options.getReplicaDataset(),
                options.getUpdateFrequencySecs(),
                options.as(GcpOptions.class).getProject()));
  }

  static class TopicSubscriptionSchema {
    final String topic;
    final String subscription;
    final Schema schema;

    TopicSubscriptionSchema(String topic, String subscription, Schema schema) {
      this.topic = topic;
      this.subscription = subscription;
      this.schema = schema;
    }
  }

  static List<TopicSubscriptionSchema> buildTopicSubscriptionSchemas(
      final String gcpProject, String topics, String subscriptions) {
    List<String> topicList;
    List<String> subscriptionList;
    List<Schema> schemaList;
    if (subscriptions != null) {
      subscriptionList = Arrays.asList(subscriptions.split(","));
      topicList = subscriptionList.stream()
          .map(s -> {
            try {
              return PubsubUtils.getPubSubTopicFromSubscription(gcpProject, s).getTopic();
            } catch (IOException e) {
              throw new RuntimeException(e);
            }
          })
          .collect(Collectors.toList());
    } else {
      topicList = Arrays.asList(topics.split(","));
      subscriptionList = topicList.stream()
          .map(t -> (String) null)
          .collect(Collectors.toList());
    }

    LOG.info("Topic list is: {}", topicList);
    LOG.info("Subscription list is: {}", subscriptionList);

    schemaList = topicList.stream()
        .map(topic -> PubsubUtils.getBeamSchemaForTopic(gcpProject, topic))
        .map(schema -> {
          if (schema == null || schema.getFields().size() == 0) {
            throw new RuntimeException("Received a null or empty schema. Can not continue");
          } else {
            return schema;
          }
        })
        .collect(Collectors.toList());

    LOG.info("Schema list is: {}", schemaList);

    List<TopicSubscriptionSchema> result = new ArrayList<>();
    for (int i = 0; i < topicList.size(); i++) {
      result.add(new TopicSubscriptionSchema(
          topicList.get(i),
          subscriptionList.get(i),
          schemaList.get(i)));
    }
    return result;
  }

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) throws IOException {
    CdcApplierOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().as(CdcApplierOptions.class);

    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  private static PipelineResult run(CdcApplierOptions options) {

    if (options.getInputTopics() != null && options.getInputSubscriptions() != null) {
      throw new IllegalArgumentException(
          "Either an input topic or a subscription must be provided");
    }

    if (options.getUpdateFrequencySecs() < MINIMUM_UPDATE_FREQUENCY_SECONDS) {
      throw new IllegalArgumentException(
          "BigQuery supports at most 1,000 MERGE statements per table per day. "
          + "Please select updateFrequencySecs of 100 or more to fit this limit");
    }

    Pipeline p = Pipeline.create(options);

    List<TopicSubscriptionSchema> readSourceSchemas =  buildTopicSubscriptionSchemas(
        options.as(GcpOptions.class).getProject(),
        options.getInputTopics(),
        options.getInputSubscriptions());

    readSourceSchemas.forEach(rss -> {
      String transformTopicPrefix = rss.topic;

      PCollection<PubsubMessage> pubsubData;
      if (rss.subscription == null) {
        pubsubData = p.apply(
            String.format("%s/Read Updates from PubSub", transformTopicPrefix),
            PubsubIO.readMessages()
                .fromTopic(String.format(
                    "projects/%s/topics/%s",
                    options.as(GcpOptions.class).getProject(), rss.topic)));
      } else {
        pubsubData = p.apply(
            String.format("%s/Read Updates from PubSub", transformTopicPrefix),
            PubsubIO.readMessages().fromSubscription(String.format(
                "projects/%s/subscriptions/%s",
                options.as(GcpOptions.class).getProject(), rss.subscription)));
      }

      PCollection<Row> collectionOfRows = pubsubData
          .apply(String.format("%s/Extract payload", transformTopicPrefix),
              MapElements.into(TypeDescriptor.of(byte[].class))
                  .via(message -> message.getPayload()))
          .apply(
              String.format("%s/Decode", transformTopicPrefix),
              DecodeRows.withSchema(rss.schema));

      buildIngestionPipeline(transformTopicPrefix, options, collectionOfRows);
    });

    PipelineResult result = p.run();
    return result;
  }

}

